GatewayUDPConnector服务,接收MPNS的消息种类如下:1
2
3
4
5//GatewayUDPConnector#init()
//PUSH消息
messageDispatcher.register(Command.GATEWAY_PUSH, () -> new GatewayPushHandler(mPushServer.getPushCenter()));
//踢人消息
messageDispatcher.register(Command.GATEWAY_KICK, () -> new GatewayKickUserHandler(mPushServer.getRouterCenter()));
处理PUSH消息:1
2
3
4
5
6
7
8
9
10
11
12
13
14public final class GatewayPushHandler extends BaseMessageHandler<GatewayPushMessage> {
private final PushCenter pushCenter;
public GatewayPushHandler(PushCenter pushCenter) {
this.pushCenter = pushCenter;
}
public GatewayPushMessage decode(Packet packet, Connection connection) {
return new GatewayPushMessage(packet, connection);
}
public void handle(GatewayPushMessage message) {
pushCenter.push(message);
}
}
消息处理,走的是 BroadcastPushTask 任务1
2
3
4
5
6
7
8
9
10
11
12
13
14//PushCenter.java
public void push(IPushMessage message) {
if (message.isBroadcast()) {
FlowControl flowControl = (message.getTaskId() == null)
? new FastFlowControl(limit, max, duration)
: new RedisFlowControl(message.getTaskId(), max);
//添加到自定义的PushTaskTimer线程池中执行该任务
addTask(new BroadcastPushTask(mPushServer, message, flowControl));
} else {
//添加到GatewayUDPConnector的netty work 线程池中执行该任务
addTask(new SingleUserPushTask(mPushServer, message, globalFlowControl));
}
}
自定义的PushTaskTimer线程池:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39//PushCenter.java
protected void doStart(Listener listener) throws Throwable {
this.pushListener = PushListenerFactory.create();
this.pushListener.init(mPushServer);
if (CC.mp.net.udpGateway() || CC.mp.thread.pool.push_task > 0) {
executor = new CustomJDKExecutor(mPushServer.getMonitor().getThreadPoolManager().getPushTaskTimer());
} else {//实际情况使用EventLoo并没有更快,还有待测试
executor = new NettyEventLoopExecutor();
}
MBeanRegistry.getInstance().register(new PushCenterBean(taskNum), null);
ackTaskQueue.start();
logger.info("push center start success");
listener.onSuccess();
}
/**
* UDP 模式使用自定义线程池
*/
private static class CustomJDKExecutor implements PushTaskExecutor {
private final ScheduledExecutorService executorService;
private CustomJDKExecutor(ScheduledExecutorService executorService) {
this.executorService = executorService;
}
public void shutdown() {
executorService.shutdown();
}
public void addTask(PushTask task) {
executorService.execute(task);
}
public void delayTask(long delay, PushTask task) {
executorService.schedule(task, delay, TimeUnit.NANOSECONDS);
}
}
线程池调度任务时,执行 run() 方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117public final class BroadcastPushTask implements PushTask {
private final long begin = System.currentTimeMillis();
private final AtomicInteger finishTasks = new AtomicInteger(0);
private final TimeLine timeLine = new TimeLine();
private final Set<String> successUserIds = new HashSet<>(1024);
private final FlowControl flowControl;
private final IPushMessage message;
private final Condition condition;
private final MPushServer mPushServer;
//使用Iterator, 记录任务遍历到的位置,因为有流控,一次任务可能会被分批发送,而且还有在推送过程中上/下线的用户
private final Iterator<Map.Entry<String, Map<Integer, LocalRouter>>> iterator;
public BroadcastPushTask(MPushServer mPushServer, IPushMessage message, FlowControl flowControl) {
this.mPushServer = mPushServer;
this.message = message;
this.flowControl = flowControl;
this.condition = message.getCondition();
this.iterator = mPushServer.getRouterCenter().getLocalRouterManager().routers().entrySet().iterator();
this.timeLine.begin("push-center-begin");
}
public void run() {
flowControl.reset();
boolean done = broadcast();
if (done) {//done 广播结束
if (finishTasks.addAndGet(flowControl.total()) == 0) {
report();
}
} else {//没有结束,就延时进行下次任务 TODO 考虑优先级问题
mPushServer.getPushCenter().delayTask(flowControl.getDelay(), this);
}
flowControl.end(successUserIds.toArray(new String[successUserIds.size()]));
}
private boolean broadcast() {
try {
iterator.forEachRemaining(entry -> {
String userId = entry.getKey();
entry.getValue().forEach((clientType, router) -> {
Connection connection = router.getRouteValue();
if (checkCondition(condition, connection)) {//1.条件检测
if (connection.isConnected()) {
if (connection.getChannel().isWritable()) { //检测TCP缓冲区是否已满且写队列超过最高阀值
PushMessage
.build(connection)
.setContent(message.getContent())
.send(future -> operationComplete(future, userId));
//4. 检测qps, 是否超过流控限制,如果超过则结束当前循环直接进入catch
if (!flowControl.checkQps()) {
throw new OverFlowException(false);
}
}
} else { //2.如果链接失效,先删除本地失效的路由,再查下远程路由,看用户是否登陆到其他机器
Logs.PUSH.warn("[Broadcast] find router in local but conn disconnect, message={}, conn={}", message, connection);
//删除已经失效的本地路由
mPushServer.getRouterCenter().getLocalRouterManager().unRegister(userId, clientType);
}
}
});
});
} catch (OverFlowException e) {
//超出最大限制,或者遍历完毕,结束广播
return e.isOverMaxLimit() || !iterator.hasNext();
}
return !iterator.hasNext();//遍历完毕, 广播结束
}
private void report() {
Logs.PUSH.info("[Broadcast] task finished, cost={}, message={}", (System.currentTimeMillis() - begin), message);
mPushServer.getPushCenter().getPushListener().onBroadcastComplete(message, timeLine.end().getTimePoints());//通知发送方,广播推送完毕
}
private boolean checkCondition(Condition condition, Connection connection) {
if (condition == AwaysPassCondition.I) return true;
SessionContext sessionContext = connection.getSessionContext();
Map<String, Object> env = new HashMap<>();
env.put("userId", sessionContext.userId);
env.put("tags", sessionContext.tags);
env.put("clientVersion", sessionContext.clientVersion);
env.put("osName", sessionContext.osName);
env.put("osVersion", sessionContext.osVersion);
return condition.test(env);
}
//@Override
private void operationComplete(ChannelFuture future, String userId) throws Exception {
if (future.isSuccess()) {//推送成功
successUserIds.add(userId);
Logs.PUSH.info("[Broadcast] push message to client success, userId={}, message={}", message.getUserId(), message);
} else {//推送失败
Logs.PUSH.warn("[Broadcast] push message to client failure, userId={}, message={}, conn={}", message.getUserId(), message, future.channel());
}
if (finishTasks.decrementAndGet() == 0) {
report();
}
}
public ScheduledExecutorService getExecutor() {
return ((Message) message).getConnection().getChannel().eventLoop();
}
}